From e2a63b21732c2d39c58a8a26773817e552917c42 Mon Sep 17 00:00:00 2001 From: "jrb44@plym.cl.cam.ac.uk" Date: Fri, 25 Mar 2005 15:39:09 +0000 Subject: [PATCH] bitkeeper revision 1.1236.43.12 (4244309dO7HfNtv-R6F5S3jdQqQR8A) Enhanced concurrency support in blockstore. Signed-off-by: James Bulpin --- tools/blktap/Makefile | 26 +++---- tools/blktap/blktaplib.c | 15 ++++ tools/blktap/blockstore.c | 126 +++++++++++++++++++++++++------ tools/blktap/parallax-threaded.h | 3 +- 4 files changed, 132 insertions(+), 38 deletions(-) diff --git a/tools/blktap/Makefile b/tools/blktap/Makefile index 7f71a219bf..3478552ac4 100644 --- a/tools/blktap/Makefile +++ b/tools/blktap/Makefile @@ -58,7 +58,7 @@ OBJS = $(patsubst %.c,%.o,$(SRCS)) LIB = libblktap.so libblktap.so.$(MAJOR) libblktap.so.$(MAJOR).$(MINOR) -all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax +all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax parallax-threaded blockstored $(MAKE) $(LIB) LINUX_ROOT := $(wildcard $(XEN_ROOT)/linux-2.6.*-xen-sparse) @@ -120,42 +120,42 @@ blkaio: $(LIB) blkaio.c blkaiolib.c $(CC) $(CFLAGS) -o blkaio -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap blkaio.c blkaiolib.c -laio -lpthread parallax: $(LIB) $(PLX_SRCS) - $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap $(PLX_SRCS) libgnbd/libgnbd.a + $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap -lpthread $(PLX_SRCS) libgnbd/libgnbd.a parallax-threaded: $(LIB) $(PLXT_SRCS) $(CC) $(CFLAGS) -o parallax-threaded -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lpthread -lblktap $(PLXT_SRCS) libgnbd/libgnbd.a vdi_test: $(LIB) $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE -lpthread $(VDI_SRCS) vdi_list: $(LIB) vdi_list.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c -lpthread $(VDI_SRCS) vdi_create: $(LIB) vdi_create.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c -lpthread $(VDI_SRCS) vdi_snap: $(LIB) vdi_snap.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c -lpthread $(VDI_SRCS) vdi_snap_list: $(LIB) vdi_snap_list.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c -lpthread $(VDI_SRCS) vdi_snap_delete: $(LIB) vdi_snap_delete.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c -lpthread $(VDI_SRCS) vdi_tree: $(LIB) vdi_tree.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c -lpthread $(VDI_SRCS) vdi_fill: $(LIB) vdi_fill.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c -lpthread $(VDI_SRCS) vdi_validate: $(LIB) vdi_validate.c $(VDI_SRCS) - $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c $(VDI_SRCS) + $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c -lpthread $(VDI_SRCS) blockstored: blockstored.c - $(CC) $(CFLAGS) -g3 -o blockstored blockstored.c + $(CC) $(CFLAGS) -g3 -o blockstored -lpthread blockstored.c bstest: bstest.c blockstore.c - $(CC) $(CFLAGS) -g3 -o bstest bstest.c blockstore.c + $(CC) $(CFLAGS) -g3 -o bstest bstest.c -lpthread blockstore.c .PHONY: TAGS clean install mk-symlinks rpm TAGS: diff --git a/tools/blktap/blktaplib.c b/tools/blktap/blktaplib.c index 35b893f677..87b680d2cc 100644 --- a/tools/blktap/blktaplib.c +++ b/tools/blktap/blktaplib.c @@ -248,12 +248,21 @@ static void apply_rsp_hooks(blkif_response_t *rsp) } } +static pthread_mutex_t push_mutex = PTHREAD_MUTEX_INITIALIZER; + void blktap_inject_response(blkif_response_t *rsp) { + apply_rsp_hooks(rsp); + write_rsp_to_fe_ring(rsp); + + pthread_mutex_lock(&push_mutex); + RING_PUSH_RESPONSES(&fe_ring); ioctl(fd, BLKTAP_IOCTL_KICK_FE); + + pthread_mutex_unlock(&push_mutex); } /*-----[ Polling fd listeners ]------------------------------------------*/ @@ -449,7 +458,9 @@ int blktap_listen(void) } /* Using this as a unidirectional ring. */ ctrl_ring.req_cons = ctrl_ring.rsp_prod_pvt = i; +pthread_mutex_lock(&push_mutex); RING_PUSH_RESPONSES(&ctrl_ring); +pthread_mutex_unlock(&push_mutex); /* empty the fe_ring */ notify_fe = 0; @@ -517,14 +528,18 @@ int blktap_listen(void) if (notify_be) { DPRINTF("notifying be\n"); +pthread_mutex_lock(&push_mutex); RING_PUSH_REQUESTS(&be_ring); ioctl(fd, BLKTAP_IOCTL_KICK_BE); +pthread_mutex_unlock(&push_mutex); } if (notify_fe) { DPRINTF("notifying fe\n"); +pthread_mutex_lock(&push_mutex); RING_PUSH_RESPONSES(&fe_ring); ioctl(fd, BLKTAP_IOCTL_KICK_FE); +pthread_mutex_unlock(&push_mutex); } } } diff --git a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c index 5de2a6885a..36903fe09e 100644 --- a/tools/blktap/blockstore.c +++ b/tools/blktap/blockstore.c @@ -13,13 +13,16 @@ #include #include #include +#include #include #include "blockstore.h" #include #include "parallax-threaded.h" #define BLOCKSTORE_REMOTE -#define BSDEBUG +//#define BSDEBUG + +#define RETRY_TIMEOUT 1000000 /* microseconds */ /***************************************************************************** * Debugging @@ -62,6 +65,37 @@ bscluster_t bsclusters[MAX_CLUSTERS]; struct sockaddr_in sin_local; int bssock = 0; +/***************************************************************************** + * Notification * + *****************************************************************************/ + +typedef struct pool_thread_t_struct { + pthread_mutex_t ptmutex; + pthread_cond_t ptcv; + int newdata; +} pool_thread_t; + +pool_thread_t pool_thread[READ_POOL_SIZE+1]; + +#define RECV_NOTIFY(tid) { \ + pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \ + pool_thread[tid].newdata = 1; \ + DB("CV Waking %u", tid); \ + pthread_cond_signal(&(pool_thread[tid].ptcv)); \ + pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); } +#define RECV_AWAIT(tid) { \ + pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \ + if (pool_thread[tid].newdata) { \ + pool_thread[tid].newdata = 0; \ + DB("CV Woken %u", tid); \ + } \ + else { \ + DB("CV Waiting %u", tid); \ + pthread_cond_wait(&(pool_thread[tid].ptcv), \ + &(pool_thread[tid].ptmutex)); \ + } \ + pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); } + /***************************************************************************** * Message queue management * *****************************************************************************/ @@ -76,23 +110,6 @@ pthread_mutex_t ptmutex_recv; #define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv) #define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv) -int notify = 0; -pthread_mutex_t ptmutex_notify; -pthread_cond_t ptcv_notify; -#define RECV_NOTIFY { \ - pthread_mutex_lock(&ptmutex_notify); \ - notify = 1; \ - pthread_cond_signal(&ptcv_notify); \ - pthread_mutex_unlock(&ptmutex_notify); } -#define RECV_AWAIT { \ - pthread_mutex_lock(&ptmutex_notify); \ - if (notify) \ - notify = 0; \ - else \ - pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \ - pthread_mutex_unlock(&ptmutex_notify); } - - /* A message queue entry. We allocate one of these for every request we send. * Asynchronous reply reception also used one of these. */ @@ -104,6 +121,8 @@ typedef struct bsq_t_struct { int length; struct msghdr msghdr; struct iovec iov[2]; + int tid; + struct timeval tv_sent; bshdr_t message; void *block; } bsq_t; @@ -267,11 +286,13 @@ int send_message(bsq_t *qe) { qe->message.luid = new_luid(); qe->status = 0; + qe->tid = (int)pthread_getspecific(tid_key); if (enqueue(qe) < 0) { fprintf(stderr, "Error enqueuing request.\n"); return -1; } + gettimeofday(&(qe->tv_sent), NULL); DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid); rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT); //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0, @@ -407,6 +428,7 @@ void recv_recycle_buffer(bsq_t *q) { int wait_recv(bsq_t **reqs, int numreqs) { bsq_t *q, *m; unsigned int x, i; + int tid = (int)pthread_getspecific(tid_key); DB("ENTER wait_recv %u\n", numreqs); @@ -420,7 +442,7 @@ int wait_recv(bsq_t **reqs, int numreqs) { return numreqs; } - RECV_AWAIT; + RECV_AWAIT(tid); /* rxagain: @@ -442,6 +464,52 @@ int wait_recv(bsq_t **reqs, int numreqs) { } +/* retry + */ +static int retry_count = 0; +int retry(bsq_t *qe) +{ + int rc; + gettimeofday(&(qe->tv_sent), NULL); + DB("retry to %d luid=%016llx\n", qe->server, qe->message.luid); + retry_count++; + rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT); + if (rc < 0) + return rc; + return 0; +} + +/* queue runner + */ +void *queue_runner(void *arg) +{ + for (;;) { + struct timeval now; + long long nowus, sus; + bsq_t *q; + int r; + + sleep(1); + + gettimeofday(&now, NULL); + nowus = now.tv_usec + now.tv_sec * 1000000; + ENTER_QUEUE_CR; + r = retry_count; + for (q = bs_head; q; q = q->next) { + sus = q->tv_sent.tv_usec + q->tv_sent.tv_sec * 1000000; + if ((nowus - sus) > RETRY_TIMEOUT) { + if (retry(q) < 0) { + fprintf(stderr, "Error on sendmsg retry.\n"); + } + } + } + if (r != retry_count) { + fprintf(stderr, "RETRIES: %u %u\n", retry_count - r, retry_count); + } + LEAVE_QUEUE_CR; + } +} + /* receive loop */ void *receive_loop(void *arg) @@ -461,7 +529,7 @@ void *receive_loop(void *arg) } else { DB("RX MATCH"); - RECV_NOTIFY; + RECV_NOTIFY(m->tid); } } } @@ -1146,8 +1214,12 @@ int __init_blockstore(void) pthread_mutex_init(&ptmutex_queue, NULL); pthread_mutex_init(&ptmutex_luid, NULL); pthread_mutex_init(&ptmutex_recv, NULL); - pthread_mutex_init(&ptmutex_notify, NULL); - pthread_cond_init(&ptcv_notify, NULL); + /*pthread_mutex_init(&ptmutex_notify, NULL);*/ + for (i = 0; i <= READ_POOL_SIZE; i++) { + pool_thread[i].newdata = 0; + pthread_mutex_init(&(pool_thread[i].ptmutex), NULL); + pthread_cond_init(&(pool_thread[i].ptcv), NULL); + } bsservers[0].hostname = "firebug.cl.cam.ac.uk"; bsservers[1].hostname = "planb.cl.cam.ac.uk"; @@ -1225,6 +1297,7 @@ int __init_blockstore(void) } pthread_create(&pthread_recv, NULL, receive_loop, NULL); + pthread_create(&pthread_recv, NULL, queue_runner, NULL); #else /* /BLOCKSTORE_REMOTE */ block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644); @@ -1262,9 +1335,14 @@ int __init_blockstore(void) void __exit_blockstore(void) { + int i; pthread_mutex_destroy(&ptmutex_recv); pthread_mutex_destroy(&ptmutex_luid); pthread_mutex_destroy(&ptmutex_queue); - pthread_mutex_destroy(&ptmutex_notify); - pthread_cond_destroy(&ptcv_notify); + /*pthread_mutex_destroy(&ptmutex_notify); + pthread_cond_destroy(&ptcv_notify);*/ + for (i = 0; i <= READ_POOL_SIZE; i++) { + pthread_mutex_destroy(&(pool_thread[i].ptmutex)); + pthread_cond_destroy(&(pool_thread[i].ptcv)); + } } diff --git a/tools/blktap/parallax-threaded.h b/tools/blktap/parallax-threaded.h index 17cdcb983e..de39609fcc 100644 --- a/tools/blktap/parallax-threaded.h +++ b/tools/blktap/parallax-threaded.h @@ -14,7 +14,8 @@ #define NOTHREADS #endif -#define READ_POOL_SIZE 128 +//#define READ_POOL_SIZE 128 +#define READ_POOL_SIZE 8 /* per-thread identifier */ pthread_key_t tid_key; -- 2.30.2